package com.atguigu.flink.datastreamapi.combine;

import com.atguigu.flink.pojo.OrderEvent;
import com.atguigu.flink.pojo.TxEvent;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.util.Collector;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
 * Created by Smexy on 2023/11/13
 */
public class Demo3_ConnectExec
{
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

        //1.读取数据，封装为数据模型
        FileSource<String> source1 = FileSource.forRecordStreamFormat(
                                                  new TextLineInputFormat(), new Path("data/OrderLog.csv"))
                                              .build();

        FileSource<String> source2 = FileSource.forRecordStreamFormat(
                                                  new TextLineInputFormat(), new Path("data/ReceiptLog.csv"))
                                              .build();

        SingleOutputStreamOperator<OrderEvent> orderDs = environment
            .fromSource(source1, WatermarkStrategy.noWatermarks(), "order")
            .map(new MapFunction<String, OrderEvent>()
            {
                @Override
                public OrderEvent map(String value) throws Exception {
                    String[] words = value.split(",");
                    return new OrderEvent(
                        words[0],
                        words[1],
                        words[2],
                        Long.valueOf(words[3])
                    );
                }
            })
            //过滤出已经支付的订单的信息
            .filter(o -> "pay".equals(o.getType()));

        SingleOutputStreamOperator<TxEvent> txDs = environment
            .fromSource(source2, WatermarkStrategy.noWatermarks(), "tx")
            .map(new MapFunction<String, TxEvent>()
            {
                @Override
                public TxEvent map(String value) throws Exception {
                    String[] words = value.split(",");
                    return new TxEvent(
                        words[0],
                        words[1],
                        Long.valueOf(words[2])
                    );
                }
            });

        //先合并
        ConnectedStreams<OrderEvent, TxEvent> connectDS = orderDs.connect(txDs);

        //必须将订单和支付数据按照 txId分组，相同txId的数据才能发送到下游的同一个Task，才有对账的可能。
        connectDS
            .keyBy(OrderEvent::getTxId,TxEvent::getTxId)
            /*
                1.KeyedCoProcessFunction有两个函数，是各自处理各自的数据类型。
                    如果涉及到了数据的交换，必须借助属性来进行操作！

                2.两个流的流速不同，因此可能出现同一笔支付数据的 TxEvent和OrderEvent无法同时到达。
                        对于两种数据都需要做以下处理。
                               数据到达时，都要到对方的缓存中去读取。
                                    读到，对账成功。
                                    读不到，此时需要把自己写入缓存，等对方到了后如法炮制。

                3.需要缓存的数据
                        OrderEvent:  txId,oId
                           TxEvent:  txId
                         使用集合缓存
             */
            .process(new KeyedCoProcessFunction<String, OrderEvent, TxEvent, String>()
            {
                private Map<String,String> orderEventCache = new HashMap<>();
                private Set<String> txEventCache = new HashSet<>();
                //处理OrderEvent
                @Override
                public void processElement1(OrderEvent value,Context ctx, Collector<String> out) throws Exception {
                    //1.去txEventCache根据txId尝试获取对应的数据
                    String txId = value.getTxId();
                    if (txEventCache.contains(txId)){
                        out.collect(value.getOId() + "对账成功...");
                    }else {
                        //2.把自己写入缓存
                        orderEventCache.put(txId,value.getOId());
                    }

                }

                //处理TxEvent
                @Override
                public void processElement2(TxEvent value, Context ctx, Collector<String> out) throws Exception {
                    //1.去orderEventCache根据txId尝试获取对应的数据
                    String txId = value.getTxId();
                    String oId = orderEventCache.get(txId);
                    if (oId != null){
                        out.collect(oId + "对账成功...");
                    }else {
                        //2.把自己写入缓存
                        txEventCache.add(txId);
                    }
                }
            })
            .print();

            environment.execute();



    }


}
